[CDK] クロスアカウントでSNSからKinesisDataFirehoseに配信する
AWSアカウントを跨いでSNSからKinesisDataFirehoseにメッセージ配信する構成を紹介します。
やりたいこと
構成図をMermaidで書いてNotionで描画しています:
flowchart LR subgraph AWSアカウントB firehose(KinesisDataFirehose配信ストリーム)--バッファリングして出力-->s3(S3バケット) end subgraph AWSアカウントA sns(SNSトピック)--配信-->firehose end
AWSアカウントAからAWSアカウントBに対してSNSを介したデータ連携を行い、アカウントB側のS3バケットにメッセージを溜めておきたいとします。この要件は SNS-->Lambda-->S3バケット
でも実現可能ですが、Lambdaの代わりにFirehoseを使うことでアプリケーションコードを1行も書かずに済むようになります。
本記事ではこの構成をCDKで構築していきたいと思います。
環境
- node 16.15.0
- typescript 4.7.4
- aws-cdk-lib 2.46.0
- constructs 10.1.43
やり方
[AWSアカウントA] SNSトピックを作成
まず、AWSアカウントAでSNSトピックを作成します。また、AWSアカウントBのリソースが本トピックを購読できるようにアクセスポリシーを追加します。
import { Construct } from "constructs" import * as cdk from "aws-cdk-lib" export class SnsStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props) // SNSトピック const topic = new cdk.aws_sns.Topic(this, "topic", {}) // AWSアカウントBのFirehoseを許可するアクセスポリシーを設定 topic.addToResourcePolicy( new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, principals: [ new cdk.aws_iam.AccountPrincipal("{AWS_ACCOUNT_B_ID}"), ], actions: ["SNS:Subscribe"], resources: [topic.topicArn], }) ) } }
上記デプロイするとアクセスポリシーに以下のJSONが設定されます。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "0", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{AWS_ACCOUNT_ID_B}:root" }, "Action": "SNS:Subscribe", "Resource": "arn:aws:sns:ap-northeast-1:{AWS_ACCOUNT_ID_A}:{SNS_TOPIC_NAME}" } ] }
[AWSアカウントB] SNSサブスクリプション+Firehoseストリーム+S3バケットを作成
続いて、AWSアカウントBでSNSサブスクリプション+Firehoseストリーム+S3バケット等々を作成します。
import { Construct } from "constructs" import * as cdk from "aws-cdk-lib" export class KinesisFirehoseStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props) const region = cdk.Stack.of(this).region const accountId = cdk.Stack.of(this).account // S3バケット const streamDestinationBucket = new cdk.aws_s3.Bucket( this, "streamDestinationBucket", { bucketName: `stream-destination-bucket-${region}-${accountId}`, removalPolicy: cdk.RemovalPolicy.DESTROY, blockPublicAccess: cdk.aws_s3.BlockPublicAccess.BLOCK_ALL, encryption: cdk.aws_s3.BucketEncryption.S3_MANAGED, } ) // KinesisFirehose配信ストリーム用ロググループ const deliveryStreamFailLogGroup = new cdk.aws_logs.LogGroup( this, "deliveryStreamFailLogGroup", { logGroupName: `/aws/kinesisfirehose/sample-stream-fail-log`, } ) // KinesisFirehose配信ストリーム用ログストリーム const deliveryStreamLogStream = new cdk.aws_logs.LogStream( this, "deliveryStreamLogStream", { logGroup: deliveryStreamFailLogGroup, logStreamName: "logs", } ) // KinesisFirehose配信ストリーム用ロール const deliveryStreamRole = new cdk.aws_iam.Role( this, "deliveryStreamRole", { assumedBy: new cdk.aws_iam.ServicePrincipal("firehose.amazonaws.com"), } ) deliveryStreamRole.addToPolicy( new cdk.aws_iam.PolicyStatement({ actions: [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", ], effect: cdk.aws_iam.Effect.ALLOW, resources: [`arn:aws:kinesis:${region}:${accountId}:stream/*`], }) ) deliveryStreamRole.addToPolicy( new cdk.aws_iam.PolicyStatement({ actions: [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject", ], effect: cdk.aws_iam.Effect.ALLOW, resources: [ streamDestinationBucket.bucketArn, `${streamDestinationBucket.bucketArn}/*`, ], }) ) deliveryStreamRole.addToPolicy( new cdk.aws_iam.PolicyStatement({ actions: ["logs:PutLogEvents"], effect: cdk.aws_iam.Effect.ALLOW, resources: [ `arn:aws:logs:${region}:${accountId}:log-group:/aws/kinesisfirehose/*`, ], }) ) // KinesisFirehose配信ストリーム const deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream( this, "deliveryStream", { deliveryStreamName: "deliveryStream", deliveryStreamType: "DirectPut", s3DestinationConfiguration: { bucketArn: streamDestinationBucket.bucketArn, roleArn: deliveryStreamRole.roleArn, //S3出力失敗時のログ記録設定 cloudWatchLoggingOptions: { enabled: true, logGroupName: deliveryStreamFailLogGroup.logGroupName, logStreamName: "logs", }, compressionFormat: "GZIP", prefix: "items", errorOutputPrefix: "errorOutput", bufferingHints: { intervalInSeconds: 60, }, }, } ) deliveryStream.addDependsOn( streamDestinationBucket.node.defaultChild as cdk.CfnResource ) deliveryStream.addDependsOn( deliveryStreamFailLogGroup.node.defaultChild as cdk.CfnResource ) deliveryStream.addDependsOn( deliveryStreamLogStream.node.defaultChild as cdk.CfnResource ) // SNSサブスクリプション用ロール const subscriptionRole = new cdk.aws_iam.Role(this, "subscriptionRole", { assumedBy: new cdk.aws_iam.ServicePrincipal("sns.amazonaws.com"), }) subscriptionRole.addToPolicy( new cdk.aws_iam.PolicyStatement({ actions: ["firehose:PutRecord"], effect: cdk.aws_iam.Effect.ALLOW, resources: [deliveryStream.attrArn], }) ) // AWSアカウントAのSNSトピックARN const topicArn = "arn:aws:sns:ap-northeast-1:{AWS_ACCOUNT_A_ID}:{SNS_TOPIC_NAME}" // SNSサブスクリプション const subscription = new cdk.aws_sns.CfnSubscription(this, "subscription", { topicArn: topicArn, // AWSアカウントAのSNSトピックと紐付ける protocol: "firehose", endpoint: deliveryStream.attrArn, subscriptionRoleArn: subscriptionRole.roleArn, }) subscription.addDependsOn(deliveryStream) subscription.addDependsOn( subscriptionRole.node.defaultChild as cdk.CfnResource ) } }
これでアカウントAのSNSからアカウントBのFirehoseにメッセージ配信できる構成となります。
デバッグ時はSNS配信ステータスのログ記録をONにすると作業しやすい
検証時はSNSトピックの「配信ステータスのログ記録」をONにするとメッセージ配信の成否、そして失敗時はその理由を確認できるようになるため、非常に作業しやすくなります。
ただし、CloudFormation/CDKではまだこの設定を行うことができないようです(2022年11月)。
- aws-sns: Support setting of delivery status logging with the CDK · Issue #21971 · aws/aws-cdk
- [sns] add sns service trust to key when subscribing to an encrypted queue · Issue #2504 · aws/aws-cdk
- AWS::SNS::Topic - DeliveryStatusLogging (new property) · Issue #66 · aws-cloudformation/cloudformation-coverage-roadmap
そのためAWSマネコンから設定します。
Amazon SNSのSMS配信のロギングを有効にしてみた | DevelopersIO
SNSトピックの編集ボタンを押下し、
- 配信ステータスのログ記録
- これらのプロトコルの配信ステータスをログに記録します:「Amazon Kinesis Data Firehose」にチェック
- 成功サンプルレート: 「100」%
- サービスロール: 新しいサービスロールの作成
と設定して保存します。
あとはSNSトピックでテスト用のメッセージを発行すると、CloudWatchロググループ sns/ap-northeast-1/{AWS_ACCOUNT_ID}/{SNS_TOPIC_NAME}
にログ出力がなされるようになります。
以上、参考になれば幸いです。
参考
- [AWS CDK] Kinesis Data FirehoseのデータのS3出力失敗時のログを記録してみた | DevelopersIO
- [CDK] SNSメッセージをKinesisFirehoseでバッファリングしてS3へ出力する
- クロスアカウントなLambdaをSNS TopicにSubscribeする | DevelopersIO
- How to subscribe Kinesis Data Firehose to SNS in other account - DEV Community ????
- チュートリアル: Amazon Simple Notification Service での AWS Lambda の使用 - AWS Lambda